-
Notifications
You must be signed in to change notification settings - Fork 1
dsv2Filter #10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
dsv2Filter #10
Conversation
56cc73b to
3372321
Compare
… Arrow on JDK9+ ### What changes were proposed in this pull request? This PR aims to add `io.netty.tryReflectionSetAccessible=true` to the testing configuration for JDK11 because this is an officially documented requirement of Apache Arrow. Apache Arrow community documented this requirement at `0.15.0` ([ARROW-6206](apache/arrow#5078)). > #### For java 9 or later, should set "-Dio.netty.tryReflectionSetAccessible=true". > This fixes `java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.(long, int) not available`. thrown by netty. ### Why are the changes needed? After ARROW-3191, Arrow Java library requires the property `io.netty.tryReflectionSetAccessible` to be set to true for JDK >= 9. After apache#26133, JDK11 Jenkins job seem to fail. - https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-3.2-jdk-11/676/ - https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-3.2-jdk-11/677/ - https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-3.2-jdk-11/678/ ```scala Previous exception in task: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available io.netty.util.internal.PlatformDependent.directBuffer(PlatformDependent.java:473) io.netty.buffer.NettyArrowBuf.getDirectBuffer(NettyArrowBuf.java:243) io.netty.buffer.NettyArrowBuf.nioBuffer(NettyArrowBuf.java:233) io.netty.buffer.ArrowBuf.nioBuffer(ArrowBuf.java:245) org.apache.arrow.vector.ipc.message.ArrowRecordBatch.computeBodyLength(ArrowRecordBatch.java:222) ``` ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Pass the Jenkins with JDK11. Closes apache#26552 from dongjoon-hyun/SPARK-ARROW-JDK11. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
viirya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So basically how to push down nested column predicates are implemented by each v2 data source. The main change here looks like:
- Add v2 Filter and subclasses.
- translate catalyst predicate to v2 Filter
- Replace v1 Filter with v2 Filter in Orc filter helper.
| import org.apache.spark.sql.connector.expressions.NamedReference; | ||
|
|
||
| @Experimental | ||
| public abstract class FilterV2 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The package is already v2. Do we need add v2?
| */ | ||
| public abstract NamedReference[] references(); | ||
|
|
||
| protected NamedReference[] findReferences(Object valve) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
valve -> value?
| /** | ||
| * Methods that can be shared when upgrading the built-in Hive. | ||
| */ | ||
| trait OrcFiltersBase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this for v2?
| /** | ||
| * The base class file format that is based on text file. | ||
| */ | ||
| abstract class TextBasedFileFormat extends FileFormatV2 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I seems not see TextBasedFileFormat and FileFormatV2 are used?
| * @throws IllegalArgumentException If the delete is rejected due to required effort | ||
| */ | ||
| void deleteWhere(Filter[] filters); | ||
| void deleteWhere(FilterV2[] filters); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that a good way to switch between v1 filters and v2 filters is to add both methods and convert from v2 to v1 in a default implementation of the v2 version. That's an easy way for people to update to the new filter API.
| import org.apache.spark.sql.connector.expressions.NamedReference; | ||
|
|
||
| @Experimental | ||
| public abstract class FilterV2 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to use Filter or should we use Predicate for expressions that evaluate to a boolean?
| * @since 3.0.0 | ||
| */ | ||
| @Experimental | ||
| case class EqualTo(ref: NamedReference, value: Any) extends FilterV2 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can FilterV2 extend the v2 Expression base?
| */ | ||
| @Experimental | ||
| case class EqualTo(ref: NamedReference, value: Any) extends FilterV2 { | ||
| override def references: Array[NamedReference] = Array(ref) ++ findReferences(value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is value Any? Shouldn't it be an expression (like the v2 Literal)?
Also, in Iceberg expressions we've updated ref to be a Term instead of a Reference. Both Reference and Transform are terms, which allows us to express that the value of a transformed reference is equal to something. That gives us the ability to express date(ts) = '2020-01-17', for example.
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.sources.v2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be in the connector.expressions package.
| /** | ||
| * Used to read and write data stored in files to/from the [[InternalRow]] format. | ||
| */ | ||
| trait FileFormatV2 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the API already supports v1 Filter, I don't think we need to make these changes. We should just continue to support the v1 filters for older sources. That decouples these changes from updates to the file sources.
|
@dbtsai, this looks like a great start to me. I'd really like to see a v2 API for predicates/filters. One thing that's missing is that the v2 API is written as Java interfaces. Spark has its own implementations that are case classes, but we do need the Java interfaces for the new filter expressions defined. I'd also recommend creating extractor functions like the ones we created to work with transforms. Those allow us to seamlessly use the Spark internal class names even if the source has returned a different implementation of the Java interface. |
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
Why are the changes needed?
Does this PR introduce any user-facing change?
How was this patch tested?